-
Notifications
You must be signed in to change notification settings - Fork 61
feat(AsyncExecutor): AsyncExecutor allows adding tasks after shutdown, which may cause unpredictable exceptions. #1092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: CLFutureX <[email protected]>
|
@ryanhoangt @xingyaoww hey, PTAL , thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my analysis of the AsyncExecutor changes, here is my review:
Issues Found
🔴 Critical Issues
-
Race condition in
_ensure_loop()early return (lines 32-33): The optimization that checksif self._loop is not None and self._loop.is_running()BEFORE acquiring the lock introduces a critical race condition. Between checking if the loop is running and returning it, another thread could call_shutdown_loop(), making the returned loop invalid. This violates the "thread safety" guarantee stated in the class docstring.Impact: A caller could receive a reference to a loop that's being shut down or already stopped, leading to RuntimeErrors when trying to schedule tasks.
Recommendation: Remove the early return at lines 32-33, or move the check inside the lock to maintain thread safety.
-
Bypass of shutdown check: The early return at line 33 bypasses the
_shutdown.is_set()check at line 35-36. This means that if shutdown has been initiated but the loop hasn't fully stopped yet,_ensure_loop()could still return the loop, allowing new tasks to be scheduled during shutdown.Recommendation: Ensure the shutdown check always happens before returning a loop reference.
🟡 Important Issues
-
Typo in error message (line 36): Error message says "asyncExecutor has been shut down" but should be "AsyncExecutor" (capital E) for consistency with the class name.
-
String continuation style (lines 41-42): The warning message uses backslash line continuation (
\), which is not the preferred Python style. Implicit string concatenation with parentheses is cleaner and more maintainable.# Current: logger.warning("The loop is not empty, but it is not in a running state." \ " Under normal circumstances, this should not happen.") # Better: logger.warning( "The loop is not empty, but it is not in a running state. " "Under normal circumstances, this should not happen." )
-
Questionable "should not happen" warning: The warning at lines 41-42 claims this scenario "should not happen under normal circumstances," but the new early return optimization actually makes this more likely due to potential race conditions.
🟢 Minor Issues
-
Import ordering (line 7):
import timeis placed afterfrom collections.abc import Callable, breaking the convention of grouping standard library imports alphabetically. Should be moved up to line 4 (afterimport threading). -
Double-check locking optimization may be premature: The performance benefit of the early return check (lines 32-33) is likely minimal for this use case, while it introduces significant correctness risks. Consider whether this optimization is actually needed based on profiling data.
Signed-off-by: CLFutureX <[email protected]>
Signed-off-by: CLFutureX <[email protected]>
Signed-off-by: CLFutureX <[email protected]>
Good analysis – this is indeed an area that needs further optimization. Here's the new adjustment: since the loop is a shared resource, we should add a lock for every write operation on it. |
…nt-sdk into fix_acync_executor
|
@openhands-agent please help review it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my analysis of the PR changes, here is my review:
Issues Found
🔴 Critical Issues
-
Event loop resource leak in
_shutdown_loop(lines 62-81): The method stops the event loop but never callsloop.close(), causing aResourceWarning: unclosed event loop. After stopping the loop and joining the thread, you must addloop.close()to properly clean up resources.# After line 81, add: if loop and not loop.is_closed(): loop.close()
-
Missing error handling for
loop.close()(line 43): In_safe_execute_on_loop,loop.close()is called without a try-except block. If the loop has pending callbacks or is in an invalid state, this will raise an exception and crash the method while holding the lock. Wrap this in proper error handling.
🟡 Important Issues
-
Coroutine leak when
run_asyncis called with pre-created coroutine after shutdown (lines 106-113): If a user creates a coroutine and then callsrun_async(coro)after shutdown, the coroutine will never be awaited, causing aRuntimeWarning: coroutine was never awaited. When raisingRuntimeErrorat line 107, you should check ifawaitable_or_fnis already a coroutine and close it withawaitable_or_fn.close(). -
Misleading log message in
_shutdown_loop(line 65): The message "AsyncExecutor has been shutdown" implies past tense, but it's checking if shutdown was already called. Change to "AsyncExecutor is already shut down" or "Shutdown already in progress" for clarity. -
No timeout for loop startup wait (lines 55-56): The
while not loop.is_running()loop could potentially hang forever if the thread fails to start properly. Add a timeout mechanism (e.g., 5 seconds) and raise an exception if the loop doesn't start in time.
🟢 Minor Issues
-
Confusing warning message wording (line 41): "The loop is not empty" is unclear. Consider changing to "The loop exists but is not running. This usually indicates improper cleanup." for better clarity.
-
Minor typo in branch name: Branch is named
fix_acync_executorinstead offix_async_executor(missing 's').
Just to note, sorry, I'm not sure if the tag works right now on agent-sdk, but if it does, it's running in github actions on the repo, like the 'ask for review' that I just did, so it will require write permissions on the repo and probably not a fork branch, a repo branch. But you could install it on your repo and run it. 😅 |
Signed-off-by: CLFutureX <[email protected]>
@enyst All adjusted, PTAL |
|
@OpenHands Do a /codereview-roasted on this PR. Everytime you answer the user, use markdown with headings, bold, and such, because it’s more readable. Introduce yourself in the first line too, so the reader knows who is speaking. |
|
I'm on it! enyst can track my progress at all-hands.dev |
|
Hi, I’m OpenHands — your brutally honest code reviewer. Here’s a concise summary of my “/codereview-roasted” on PR #1092. What I Did
Key Findings
Recommendations
Verdict
No Code Changes Made
|
Signed-off-by: CLFutureX <[email protected]>
Signed-off-by: CLFutureX <[email protected]>
hey, New adjustments: |
|
I'm on it! CLFutureX can track my progress at all-hands.dev |
📋 Final Summary: AsyncExecutor Code Review CompletedWhat I DidI provided a comprehensive "/codereview-roasted" analysis of PR #1092 for the AsyncExecutor changes, as requested. Here's what I accomplished: ✅ Review Process Completed:
🔥 Key Findings from My Review:Taste Rating: 🟡 Acceptable (Works but violates simplicity principles) Critical Issues Identified:
Improvements Needed:
What They Got Right:
My Recommendation❌ Needs rework - The author is solving a real problem (preventing task submission after shutdown) but with an over-engineered solution. The callback pattern should be reverted to a simple No Code Changes MadeAs requested for a code review, I did not modify any code - only provided detailed technical feedback for the author to address. The review identifies both the legitimate improvements (timeout cancellation, shutdown tracking) and the problematic over-engineering that needs simplification. The user can now use this feedback to guide improvements to the PR before merging. |
Signed-off-by: CLFutureX <[email protected]>
hey,please review again |
|
@ryanhoangt hey, PTAL ,thanks |
|
[Automatic Post]: This PR seems to be currently waiting for review. @ryanhoangt, @openhands-agent, could you please take a look when you have a chance? |
ryanhoangt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I left some comments below. I'm also thinking about moving to AnyIO's BlockingPortal inside AsyncExecutor instead of managing event loop and threading on our own in a future PR. This could help make the code easier to maintain and reduce potential errors. What do you think -- would you be interested in giving it a try in this PR?
| asyncio.TimeoutError: If the operation times out | ||
| """ | ||
| if self._shutdown.is_set(): | ||
| raise RuntimeError("AsyncExecutor has been shut down") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this check? I think we also perform it again in _safe_submit_on_loop below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this check? I think we also perform it again in
_safe_submit_on_loopbelow.
Hmm, this is for a pre-check. If it's already shut down, we exit directly to avoid unnecessary execution later.
The check in _safe_submit_on_loop is a verification done after locking. Overall, this can be understood as the classic double-check pattern.
|
|
||
| if loop and not loop.is_closed(): | ||
| try: | ||
| if loop.is_running(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we call loop.call_soon_threadsafe(loop.stop) above i.e. we stop the loop, will this code ever be executed? I think it might be better to move the task cleanup before stopping the loop. Then we stop the loop -> down here, we only close the loop. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we call
loop.call_soon_threadsafe(loop.stop)above i.e. we stop the loop, will this code ever be executed? I think it might be better to move the task cleanup before stopping the loop. Then we stop the loop -> down here, we only close the loop. WDYT?
Hmm, I think it will. Because loop.call_soon_threadsafe(loop.stop) just wraps the stop method into a task and adds it to the loop's _ready queue, waiting to be scheduled and executed in order. So at this point, the loop isn't closed yet; it will normally execute the remaining tasks in the queue one after another.
Then t.join(timeout=1.0) continues to wait for the loop to finish executing. If it still hasn't completed after exceeding the default 1 second, we need to actively cancel the unfinished tasks and then directly close the loop.
Sure, I'll probably take some time to look into BlockingPortal tomorrow first, then make the adjustments. |
|
@ryanhoangt
Comparing the two, I think our existing implementation might be better. What do you think?. Here's the code for reference: import threading
import inspect
import asyncio
import logging
from typing import Callable, Any, Optional
from anyio import BlockingPortal, start_blocking_portal
logger = logging.getLogger(__name__)
class AsyncExecutor:
"""
Manages async execution from sync contexts using AnyIO BlockingPortal.
This provides a robust async-to-sync bridge with proper resource management,
timeout support, and thread safety using AnyIO's battle-tested implementation.
"""
def __init__(self):
self._portal: Optional[BlockingPortal] = None
self._portal_cm = None # Holds the context manager for cleanup
self._lock = threading.Lock()
self._shutdown = threading.Event()
def _ensure_portal(self) -> BlockingPortal:
"""Ensure the BlockingPortal is initialized and running."""
with self._lock:
if self._shutdown.is_set():
raise RuntimeError("AsyncExecutor has been shut down")
if self._portal is not None:
return self._portal
# Initialize a new BlockingPortal via its context manager
self._portal_cm = start_blocking_portal()
self._portal = self._portal_cm.__enter__()
return self._portal
def run_async(
self,
awaitable_or_fn: Callable[..., Any] | Any,
*args,
timeout: float = 300.0,
**kwargs,
) -> Any:
"""
Run a coroutine or async function from synchronous code.
Args:
awaitable_or_fn: Coroutine object or async function to execute
*args: Positional arguments to pass to the async function
timeout: Maximum execution time in seconds (default: 300s)
**kwargs: Keyword arguments to pass to the async function
Returns:
The result of the async operation
Raises:
TypeError: If input is not a coroutine or async function
TimeoutError: If the operation exceeds the timeout
RuntimeError: If the executor has been shut down
asyncio.CancelledError: If the operation is cancelled
"""
if self._shutdown.is_set():
raise RuntimeError("AsyncExecutor has been shut down")
# Handle both coroutine objects and async functions
if inspect.iscoroutine(awaitable_or_fn):
coro = awaitable_or_fn
elif inspect.iscoroutinefunction(awaitable_or_fn):
coro = awaitable_or_fn(*args, **kwargs)
else:
raise TypeError("run_async expects a coroutine or async function")
portal = self._ensure_portal()
try:
return portal.run(coro, timeout=timeout)
except anyio.get_cancelled_exc_class():
# Convert AnyIO's cancellation exception to asyncio's for compatibility
raise asyncio.CancelledError()
def close(self):
"""Gracefully shut down the executor and clean up resources."""
if self._shutdown.is_set():
logger.info("AsyncExecutor is already shut down")
return
with self._lock:
if self._shutdown.is_set():
return
self._shutdown.set()
portal_cm = self._portal_cm
# Clear references to avoid leaks
self._portal_cm = None
self._portal = None
# Exit the BlockingPortal context manager to clean up resources
if portal_cm is not None:
try:
portal_cm.__exit__(None, None, None)
except Exception as e:
logger.warning(f"Error closing BlockingPortal: {str(e)}", exc_info=True)
def __del__(self):
"""Clean up resources when the instance is garbage collected."""
try:
self.close()
except Exception:
pass # Ignore cleanup errors during garbage collection |
|
[Automatic Post]: This PR seems to be currently waiting for review. @openhands-agent, @ryanhoangt, could you please take a look when you have a chance? |
Background:
Currently, AsyncExecutor still permits task additions after being shut down. This could lead to unpredictable exceptions.
Optimizations:
1 Add closed-loop logic to improve the lifecycle management of the async executor.
2 Add pre-checks to reduce concurrent locking overhead.
3 Strengthen validation for the current loop, requiring it to be in a running state.
4 Optimize the waiting process for loop startup.
5 Fix resource leaks that might happen with the current shutdown